Skip to content

[BP-2.0][FLINK-37663][connector-base] Fix lost wakeup in FutureCompletingBlockingQueue#28470

Open
MartijnVisser wants to merge 2 commits into
apache:release-2.0from
MartijnVisser:backport-FLINK-37663-release-2.0
Open

[BP-2.0][FLINK-37663][connector-base] Fix lost wakeup in FutureCompletingBlockingQueue#28470
MartijnVisser wants to merge 2 commits into
apache:release-2.0from
MartijnVisser:backport-FLINK-37663-release-2.0

Conversation

@MartijnVisser

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Backport of #28463 to release-2.0.

FutureCompletingBlockingQueue (the split-fetcher to source-reader handover) has a lost-wakeup bug. A putter that blocks in put() registers its Condition in the internal notFull queue, but that condition is not always removed when the putter stops waiting:

  • when the putter is gracefully woken via wakeUpPuttingThread(int) (and returns false without enqueuing), and
  • when the putter's await() returns spuriously (permitted by the Condition contract) and it re-blocks, re-adding its condition.

The leftover (stale or duplicated) entry makes a later signalNextPutter(), fired when a consumer frees a slot, signal a thread that is no longer waiting. A genuinely waiting putter is then never woken, so a split fetcher can stall indefinitely.

Brief change log

  • FutureCompletingBlockingQueue#waitOnPut: remove the putter's condition from notFull once await() returns (in a finally), restoring the invariant that notFull only holds conditions of currently waiting putters.
  • Added a @VisibleForTesting accessor (getNumberOfQueuedPutters()) so the regression test can deterministically sequence the two contending putters. Like the existing @VisibleForTesting take() in the same class, this trips the blanket connector "depend only on public API" ArchUnit rule, so it is recorded in the flink-architecture-tests-production violation store alongside the existing entry.
  • Added FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter, a deterministic regression test that reproduces the stranded-putter stall and fails on the unfixed code.

Verifying this change

This change added tests and can be verified as follows:

  • FutureCompletingBlockingQueueTest#testWakeUpDoesNotStrandAnotherPutter blocks two putters, wakes the first gracefully, then frees one slot and asserts the second (still-waiting) putter is signalled. It fails on the code before the fix (the still-waiting putter is stranded, the latch times out) and passes after the fix.
  • Existing tests in the module remain green.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (FutureCompletingBlockingQueue is @Internal)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes (Claude Code, Claude Opus 4.8)

Generated-by: Claude Code (Claude Opus 4.8)

…ockingQueue lost wakeup

A freed slot signals a putter that wakeUpPuttingThread() already released instead of the genuinely waiting putter; the test reproduces the stall and fails without the fix. Its @VisibleForTesting accessor is recorded in the architecture violation store, like the existing take().

Generated-by: Claude Code (Claude Opus 4.8)
… wakeup

Remove a blocked putter's condition from the notFull queue when its await() returns, so signalNextPutter() no longer signals a putter that has stopped waiting after a graceful wakeUpPuttingThread() or a spurious wakeup.

Generated-by: Claude Code (Claude Opus 4.8)
@flinkbot

flinkbot commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@spuru9 spuru9 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean Backport

@raminqaf raminqaf left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions Bot added target:release-2.0 community-reviewed-LGTM Applied if there are 2 non-committer approves on a PR. (The submitter cannot approve their own PR.) labels Jun 17, 2026
@MartijnVisser

Copy link
Copy Markdown
Contributor Author

@flinkbot run azure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed-LGTM Applied if there are 2 non-committer approves on a PR. (The submitter cannot approve their own PR.) target:release-2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants